/*
  Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <dlfcn.h>

#include "dht-common.h"
#include "tier.h"
#include "tier-common.h"
#include "syscall.h"
#include "events.h"
#include "tier-ctr-interface.h"

/*Hard coded DB info*/
static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3;
/*Hard coded DB info*/

/*Mutex for updating the data movement stats*/
static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER;

/* Stores the path location of promotion query files */
static char *promotion_qfile;
/* Stores the path location of demotion query files */
static char *demotion_qfile;

static void *libhandle;
static gfdb_methods_t gfdb_methods;

#define DB_QUERY_RECORD_SIZE 4096

/*
 * Closes all the fds and frees the qfile_array
 * */
static void
qfile_array_free (tier_qfile_array_t *qfile_array)
{
        ssize_t i = 0;

        if (qfile_array) {
                if (qfile_array->fd_array) {
                        for (i = 0; i < qfile_array->array_size; i++) {
                                if (qfile_array->fd_array[i] != -1) {
                                        sys_close (qfile_array->fd_array[i]);
                                }
                        }
                }
                GF_FREE (qfile_array->fd_array);
        }
        GF_FREE (qfile_array);
}


/* Create a new query file list with given size */
static tier_qfile_array_t *
qfile_array_new (ssize_t array_size)
{
        int ret                         = -1;
        tier_qfile_array_t *qfile_array   = NULL;
        ssize_t i                       = 0;

        GF_VALIDATE_OR_GOTO ("tier", (array_size > 0), out);

        qfile_array = GF_CALLOC (1, sizeof (tier_qfile_array_t),
                                gf_tier_mt_qfile_array_t);
        if (!qfile_array) {
                gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                        "Failed to allocate memory for tier_qfile_array_t");
                goto out;
        }

        qfile_array->fd_array = GF_CALLOC (array_size, sizeof (int),
                                        gf_dht_mt_int32_t);
        if (!qfile_array->fd_array) {
                gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                        "Failed to allocate memory for "
                        "tier_qfile_array_t->fd_array");
                goto out;
        }

        /* Init all the fds to -1 */
        for (i = 0; i < array_size; i++) {
                qfile_array->fd_array[i] = -1;
        }

        qfile_array->array_size = array_size;
        qfile_array->next_index = 0;

        /* Set exhausted count to list size as the list is empty */
        qfile_array->exhausted_count = qfile_array->array_size;

        ret = 0;
out:
        if (ret) {
                qfile_array_free (qfile_array);
                qfile_array = NULL;
        }
        return qfile_array;
}


/* Checks if the query file list is empty or totally exhausted. */
static gf_boolean_t
is_qfile_array_empty (tier_qfile_array_t *qfile_array)
{
        return (qfile_array->exhausted_count == qfile_array->array_size) ?
                _gf_true : _gf_false;
}


/* Shifts the next_fd pointer to the next available fd in the list */
static void
shift_next_index (tier_qfile_array_t *qfile_array)
{
        int qfile_fd = 0;
        int spin_count = 0;

        if (is_qfile_array_empty (qfile_array)) {
                return;
        }

        do {
                /* change next_index in a rotional manner */
                (qfile_array->next_index == (qfile_array->array_size - 1)) ?
                qfile_array->next_index = 0 : qfile_array->next_index++;

                qfile_fd = (qfile_array->fd_array[qfile_array->next_index]);

                spin_count++;

        } while ((qfile_fd == -1) && (spin_count < qfile_array->array_size));

}

/*
 * This is a non-thread safe function to read query records
 * from a list of query files in a Round-Robin manner.
 * As in when the query files get exhuasted they are closed.
 * Returns:
 * 0   if all the query records in all the query files of the list are
 *     exhausted.
 * > 0 if a query record is successfully read. Indicates the size of the query
 *     record read.
 * < 0  if there was failure
 * */
static int
read_query_record_list (tier_qfile_array_t *qfile_array,
                          gfdb_query_record_t **query_record)
{
        int ret = -1;
        int qfile_fd = 0;

        GF_VALIDATE_OR_GOTO ("tier", qfile_array, out);
        GF_VALIDATE_OR_GOTO ("tier", qfile_array->fd_array, out);

        do {
                if (is_qfile_array_empty (qfile_array)) {
                        ret = 0;
                        break;
                }

                qfile_fd = qfile_array->fd_array[qfile_array->next_index];
                ret = gfdb_methods.gfdb_read_query_record
                        (qfile_fd, query_record);
                if (ret <= 0) {
                        /*The qfile_fd has reached EOF or
                         * there was an error.
                         * 1. Close the exhausted fd
                         * 2. increment the exhausted count
                         * 3. shift next_qfile to next qfile
                         **/
                         sys_close (qfile_fd);
                         qfile_array->fd_array[qfile_array->next_index] = -1;
                         qfile_array->exhausted_count++;
                         /* shift next_qfile to next qfile */
                         shift_next_index (qfile_array);
                         continue;
                } else {
                        /* shift next_qfile to next qfile */
                        shift_next_index (qfile_array);
                        break;
                }
        } while (1);
out:
        return ret;
}


/* Check and update the watermark every WM_INTERVAL seconds */
#define WM_INTERVAL            5
#define WM_INTERVAL_EMERG      1

static int
tier_check_same_node (xlator_t *this, loc_t *loc, gf_defrag_info_t *defrag)
{
        int         ret                     = -1;
        dict_t     *dict                    = NULL;
        char       *uuid_str                = NULL;
        uuid_t      node_uuid               = {0,};

        GF_VALIDATE_OR_GOTO ("tier", this, out);
        GF_VALIDATE_OR_GOTO (this->name, loc, out);
        GF_VALIDATE_OR_GOTO (this->name, defrag, out);

        if (syncop_getxattr (this, loc, &dict, GF_XATTR_NODE_UUID_KEY,
                             NULL, NULL)) {
                gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                        "Unable to get NODE_UUID_KEY %s %s\n",
                        loc->name, loc->path);
                goto out;
        }

        if (dict_get_str (dict, GF_XATTR_NODE_UUID_KEY, &uuid_str) < 0) {
                gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                        "Failed to get node-uuids for %s", loc->path);
                goto out;
        }


        if (gf_uuid_parse (uuid_str, node_uuid)) {
                gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                        "uuid_parse failed for %s", loc->path);
                goto out;
        }

        if (gf_uuid_compare (node_uuid, defrag->node_uuid)) {
                gf_msg_debug (this->name, 0,
                              "%s does not belong to this node", loc->path);
                ret = 1;
                goto out;
        }

        ret = 0;
out:
        if (dict)
                dict_unref(dict);

        return ret;
}

int
tier_get_fs_stat (xlator_t *this, loc_t *root_loc)
{
        int                     ret = 0;
        gf_defrag_info_t       *defrag = NULL;
        dht_conf_t             *conf   = NULL;
        dict_t                 *xdata  = NULL;
        struct statvfs          statfs = {0, };
        gf_tier_conf_t         *tier_conf = NULL;


        conf = this->private;
        if (!conf) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "conf is NULL");
                ret = -1;
                goto exit;
        }

        defrag = conf->defrag;
        if (!defrag) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "defrag is NULL");
                ret = -1;
                goto exit;
        }

        tier_conf = &defrag->tier_conf;

        xdata = dict_new ();
        if (!xdata) {
                gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
                        DHT_MSG_NO_MEMORY,
                        "failed to allocate dictionary");
                ret = -1;
                goto exit;
        }

        ret = dict_set_int8 (xdata, GF_INTERNAL_IGNORE_DEEM_STATFS, 1);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_DICT_SET_FAILED,
                        "Failed to set "
                        GF_INTERNAL_IGNORE_DEEM_STATFS" in dict");
                ret = -1;
                goto exit;
        }

        /* Find how much free space is on the hot subvolume.
         * Then see if that value */
        /* is less than or greater than user defined watermarks.
         * Stash results in */
        /* the tier_conf data structure. */

        ret = syncop_statfs (conf->subvolumes[1], root_loc, &statfs,
                             xdata, NULL);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, -ret,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Unable to obtain statfs.");
                goto exit;
        }

        pthread_mutex_lock (&dm_stat_mutex);

        tier_conf->block_size = statfs.f_bsize;
        tier_conf->blocks_total = statfs.f_blocks;
        tier_conf->blocks_used = statfs.f_blocks - statfs.f_bfree;

        tier_conf->percent_full = GF_PERCENTAGE(tier_conf->blocks_used,
                                                statfs.f_blocks);
        pthread_mutex_unlock (&dm_stat_mutex);

exit:
        if (xdata)
                dict_unref (xdata);
        return ret;
}

static void
tier_send_watermark_event (const char *volname,
                           tier_watermark_op_t old_wm,
                           tier_watermark_op_t new_wm)
{
        if (old_wm == TIER_WM_LOW || old_wm == TIER_WM_NONE) {
                if (new_wm == TIER_WM_MID) {
                        gf_event (EVENT_TIER_WATERMARK_RAISED_TO_MID,
                                  "vol=%s", volname);
                } else if (new_wm == TIER_WM_HI) {
                        gf_event (EVENT_TIER_WATERMARK_HI, "vol=%s", volname);
                }
        } else if (old_wm == TIER_WM_MID) {
                if (new_wm == TIER_WM_LOW) {
                        gf_event (EVENT_TIER_WATERMARK_DROPPED_TO_LOW,
                                  "vol=%s", volname);
                } else if (new_wm == TIER_WM_HI) {
                        gf_event (EVENT_TIER_WATERMARK_HI, "vol=%s", volname);
                }
        } else if (old_wm == TIER_WM_HI) {
                if (new_wm == TIER_WM_MID) {
                        gf_event (EVENT_TIER_WATERMARK_DROPPED_TO_MID,
                                  "vol=%s", volname);
                } else if (new_wm == TIER_WM_LOW) {
                        gf_event (EVENT_TIER_WATERMARK_DROPPED_TO_LOW,
                                  "vol=%s", volname);
                }
        }
}

int
tier_check_watermark (xlator_t *this)
{
        int                     ret       = -1;
        gf_defrag_info_t       *defrag    = NULL;
        dht_conf_t             *conf      = NULL;
        gf_tier_conf_t         *tier_conf = NULL;
        tier_watermark_op_t     wm        = TIER_WM_NONE;

        conf = this->private;
        if (!conf)
                goto exit;

        defrag = conf->defrag;
        if (!defrag)
                goto exit;

        tier_conf = &defrag->tier_conf;

        if (tier_conf->percent_full < tier_conf->watermark_low) {
                wm = TIER_WM_LOW;

        } else if (tier_conf->percent_full < tier_conf->watermark_hi) {
                wm = TIER_WM_MID;

        } else {
                wm = TIER_WM_HI;
        }

        if (wm != tier_conf->watermark_last) {

                tier_send_watermark_event (tier_conf->volname,
                                           tier_conf->watermark_last,
                                           wm);

                tier_conf->watermark_last = wm;
                gf_msg (this->name, GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Tier watermark now %d", wm);
        }

        ret = 0;

exit:
        return ret;
}


static gf_boolean_t
is_hot_tier_full (gf_tier_conf_t *tier_conf)
{
        if (tier_conf && (tier_conf->mode == TIER_MODE_WM) &&
           (tier_conf->watermark_last == TIER_WM_HI))
                return _gf_true;

        return _gf_false;
}

int
tier_do_migration (xlator_t *this, int promote)
{
        gf_defrag_info_t       *defrag = NULL;
        dht_conf_t             *conf   = NULL;
        long                    rand = 0;
        int                     migrate = 0;
        gf_tier_conf_t         *tier_conf = NULL;

        conf = this->private;
        if (!conf)
                goto exit;

        defrag = conf->defrag;
        if (!defrag)
                goto exit;

        if (tier_check_watermark (this) != 0) {
                gf_msg (this->name, GF_LOG_CRITICAL, errno,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to get watermark");
                goto exit;
        }

        tier_conf = &defrag->tier_conf;

        switch (tier_conf->watermark_last) {
        case TIER_WM_LOW:
                migrate = promote ? 1 : 0;
                break;
        case TIER_WM_HI:
                migrate = promote ? 0 : 1;
                break;
        case TIER_WM_MID:
                /* coverity[DC.WEAK_CRYPTO] */
                rand = random() % 100;
                if (promote) {
                        migrate = (rand > tier_conf->percent_full);
                } else {
                        migrate = (rand <= tier_conf->percent_full);
                }
                break;
        }

exit:
        return migrate;
}

int
tier_migrate (xlator_t *this, int is_promotion, dict_t *migrate_data,
              loc_t *loc, gf_tier_conf_t *tier_conf)
{
        int ret = -1;

        pthread_mutex_lock (&tier_conf->pause_mutex);
        if (is_promotion)
                tier_conf->promote_in_progress = 1;
        else
                tier_conf->demote_in_progress = 1;
        pthread_mutex_unlock (&tier_conf->pause_mutex);

        /* Data migration */
        ret = syncop_setxattr (this, loc, migrate_data, 0,
                               NULL, NULL);

        pthread_mutex_lock (&tier_conf->pause_mutex);
        if (is_promotion)
                tier_conf->promote_in_progress = 0;
        else
                tier_conf->demote_in_progress = 0;
        pthread_mutex_unlock (&tier_conf->pause_mutex);

        return ret;
}

/* returns  _gf_true: if file can be promoted
 * returns _gf_false: if file cannot be promoted
 */
static gf_boolean_t
tier_can_promote_file (xlator_t *this, char const *file_name,
                       struct iatt *current, gf_defrag_info_t *defrag)
{
        gf_boolean_t ret = _gf_false;
        fsblkcnt_t estimated_usage = 0;

        if (defrag->tier_conf.tier_max_promote_size &&
            (current->ia_size > defrag->tier_conf.tier_max_promote_size)) {
                gf_msg (this->name, GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "File %s (gfid:%s) with size (%lu) exceeds maxsize "
                        "(%d) for promotion. File will not be promoted.",
                        file_name,
                        uuid_utoa(current->ia_gfid),
                        current->ia_size,
                        defrag->tier_conf.tier_max_promote_size);
                goto err;
        }

        /* bypass further validations for TEST mode */
        if (defrag->tier_conf.mode != TIER_MODE_WM) {
                ret = _gf_true;
                goto err;
        }

        /* convert the file size to blocks as per the block size of the
         * destination tier
         * NOTE: add (block_size - 1) to get the correct block size when
         *       there is a remainder after a modulo
         */
        estimated_usage = ((current->ia_size + defrag->tier_conf.block_size - 1) /
                                defrag->tier_conf.block_size) +
                                defrag->tier_conf.blocks_used;

        /* test if the estimated block usage goes above HI watermark */
        if (GF_PERCENTAGE (estimated_usage, defrag->tier_conf.blocks_total) >=
                        defrag->tier_conf.watermark_hi) {
                gf_msg (this->name, GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Estimated block count consumption on "
                        "hot tier (%lu) exceeds hi watermark (%d%%). "
                        "File will not be promoted.",
                        estimated_usage,
                        defrag->tier_conf.watermark_hi);
                goto err;
        }
        ret = _gf_true;
err:
        return ret;
}

static int
tier_set_migrate_data (dict_t *migrate_data)
{
        int    failed = 1;


        failed = dict_set_str (migrate_data, GF_XATTR_FILE_MIGRATE_KEY, "force");
        if (failed) {
                goto bail_out;
        }

        /* Flag to suggest the xattr call is from migrator */
        failed = dict_set_str (migrate_data, "from.migrator", "yes");
        if (failed) {
                goto bail_out;
        }

        /* Flag to suggest its a tiering migration
         * The reason for this dic key-value is that
         * promotions and demotions are multithreaded
         * so the original frame from gf_defrag_start()
         * is not carried. A new frame will be created when
         * we do syncop_setxattr(). This does not have the
         * frame->root->pid of the original frame. So we pass
         * this dic key-value when we do syncop_setxattr() to do
         * data migration and set the frame->root->pid to
         * GF_CLIENT_PID_TIER_DEFRAG in dht_setxattr() just before
         * calling dht_start_rebalance_task() */
        failed = dict_set_str (migrate_data, TIERING_MIGRATION_KEY, "yes");
        if (failed) {
                goto bail_out;
        }

        failed = 0;

bail_out:
        return failed;
}

static char *
tier_get_parent_path (xlator_t          *this,
                      loc_t             *p_loc,
                      struct iatt       *par_stbuf,
                      int               *per_link_status)
{
        int     ret             = -1;
        char    *parent_path    = NULL;
        dict_t  *xdata_request  = NULL;
        dict_t  *xdata_response = NULL;


        xdata_request = dict_new ();
        if (!xdata_request) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to create xdata_request dict");
                goto err;
        }
        ret = dict_set_int32 (xdata_request,
                              GET_ANCESTRY_PATH_KEY, 42);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to set value to dict : key %s \n",
                        GET_ANCESTRY_PATH_KEY);
                goto err;
        }

        ret = syncop_lookup (this, p_loc, par_stbuf, NULL,
                             xdata_request, &xdata_response);
        /* When the parent gfid is a stale entry, the lookup
         * will fail and stop the demotion process.
         * The parent gfid can be stale when a huge folder is
         * deleted while the files within it are being migrated
         */
        if (ret == -ESTALE) {
                gf_msg (this->name, GF_LOG_WARNING, -ret,
                        DHT_MSG_STALE_LOOKUP,
                        "Stale entry in parent lookup for %s",
                        uuid_utoa (p_loc->gfid));
                *per_link_status = 1;
                goto err;
        } else if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, -ret,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Error in parent lookup for %s",
                        uuid_utoa (p_loc->gfid));
                *per_link_status = -1;
                goto err;
        }
        ret = dict_get_str (xdata_response, GET_ANCESTRY_PATH_KEY, &parent_path);
        if (ret || !parent_path) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to get parent path for %s",
                        uuid_utoa (p_loc->gfid));
                *per_link_status = -1;
                goto err;
        }

err:
        if (xdata_request) {
                dict_unref (xdata_request);
        }

        if (xdata_response) {
                dict_unref (xdata_response);
                xdata_response = NULL;
        }

        return parent_path;
}

static int
tier_get_file_name_and_path (xlator_t                   *this,
                             uuid_t                     gfid,
                             gfdb_link_info_t           *link_info,
                             char const                 *parent_path,
                             loc_t                      *loc,
                             int                        *per_link_status)
{
        int             ret     = -1;

        loc->name = gf_strdup (link_info->file_name);
        if (!loc->name) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR, "Memory "
                        "allocation failed for %s",
                        uuid_utoa (gfid));
                *per_link_status = -1;
                goto err;
        }
        ret = gf_asprintf((char **)&(loc->path), "%s/%s", parent_path, loc->name);
        if (ret < 0) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR, "Failed to "
                        "construct file path for %s %s\n",
                        parent_path, loc->name);
                *per_link_status = -1;
                goto err;
        }

        ret = 0;

err:
        return ret;
}

static int
tier_lookup_file (xlator_t    *this,
                  loc_t       *p_loc,
                  loc_t       *loc,
                  struct iatt *current,
                  int         *per_link_status)
{
        int     ret = -1;

        ret = syncop_lookup (this, loc, current, NULL, NULL, NULL);

        /* The file may be deleted even when the parent
         * is available and the lookup will
         * return a stale entry which would stop the
         * migration. so if its a stale entry, then skip
         * the file and keep migrating.
         */
        if (ret == -ESTALE) {
                gf_msg (this->name, GF_LOG_WARNING, -ret,
                        DHT_MSG_STALE_LOOKUP,
                        "Stale lookup for %s",
                        uuid_utoa (p_loc->gfid));
                *per_link_status = 1;
                goto err;
        } else if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, -ret,
                        DHT_MSG_LOG_TIER_ERROR, "Failed to "
                        "lookup file %s\n", loc->name);
                *per_link_status = -1;
                goto err;
        }
        ret = 0;

err:
        return ret;
}

static gf_boolean_t
tier_is_file_already_at_destination (xlator_t           *src_subvol,
                                     query_cbk_args_t   *query_cbk_args,
                                     dht_conf_t         *conf,
                                     int                *per_link_status)
{
        gf_boolean_t    at_destination = _gf_true;

        if (src_subvol == NULL) {
                *per_link_status = 1;
                goto err;
        }
        if (query_cbk_args->is_promotion &&
            src_subvol == conf->subvolumes[1]) {
                *per_link_status = 1;
                goto err;
        }

        if (!query_cbk_args->is_promotion &&
            src_subvol == conf->subvolumes[0]) {
                *per_link_status = 1;
                goto err;
        }
        at_destination = _gf_false;

err:
        return at_destination;
}

static void
tier_update_migration_counters (query_cbk_args_t        *query_cbk_args,
                                gf_defrag_info_t        *defrag,
                                uint64_t                *total_migrated_bytes,
                                int                     *total_files)
{
        if (query_cbk_args->is_promotion) {
                defrag->total_files_promoted++;
                *total_migrated_bytes +=
                        defrag->tier_conf.st_last_promoted_size;
                pthread_mutex_lock (&dm_stat_mutex);
                defrag->tier_conf.blocks_used +=
                        defrag->tier_conf.st_last_promoted_size;
                pthread_mutex_unlock (&dm_stat_mutex);
        } else {
                defrag->total_files_demoted++;
                *total_migrated_bytes +=
                        defrag->tier_conf.st_last_demoted_size;
                pthread_mutex_lock (&dm_stat_mutex);
                defrag->tier_conf.blocks_used -=
                        defrag->tier_conf.st_last_demoted_size;
                pthread_mutex_unlock (&dm_stat_mutex);
        }
        if (defrag->tier_conf.blocks_total) {
                pthread_mutex_lock (&dm_stat_mutex);
                defrag->tier_conf.percent_full =
                        GF_PERCENTAGE (defrag->tier_conf.blocks_used,
                                        defrag->tier_conf.blocks_total);
                pthread_mutex_unlock (&dm_stat_mutex);
        }

        (*total_files)++;
}

static int
tier_migrate_link (xlator_t            *this,
                   dht_conf_t          *conf,
                   uuid_t              gfid,
                   gfdb_link_info_t    *link_info,
                   gf_defrag_info_t    *defrag,
                   query_cbk_args_t    *query_cbk_args,
                   dict_t              *migrate_data,
                   int                 *per_link_status,
                   int                 *total_files,
                   uint64_t            *total_migrated_bytes)
{
        int             ret             = -1;
        struct iatt     current         = {0,};
        struct iatt     par_stbuf       = {0,};
        loc_t           p_loc           = {0,};
        loc_t           loc             = {0,};
        xlator_t        *src_subvol     = NULL;
        inode_t         *linked_inode   = NULL;
        char            *parent_path    = NULL;


        /* Lookup for parent and get the path of parent */
        gf_uuid_copy (p_loc.gfid, link_info->pargfid);
        p_loc.inode = inode_new (defrag->root_inode->table);
        if (!p_loc.inode) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to create reference to inode"
                        " for %s", uuid_utoa (p_loc.gfid));

                *per_link_status = -1;
                goto err;
        }

        parent_path = tier_get_parent_path (this, &p_loc, &par_stbuf,
                                            per_link_status);
        if (!parent_path) {
                goto err;
        }

        linked_inode = inode_link (p_loc.inode, NULL, NULL, &par_stbuf);
        inode_unref (p_loc.inode);
        p_loc.inode = linked_inode;


        /* Preparing File Inode */
        gf_uuid_copy (loc.gfid, gfid);
        loc.inode = inode_new (defrag->root_inode->table);
        gf_uuid_copy (loc.pargfid, link_info->pargfid);
        loc.parent = inode_ref (p_loc.inode);

        /* Get filename and Construct file path */
        if (tier_get_file_name_and_path (this, gfid, link_info,
                                         parent_path, &loc, per_link_status) != 0) {
                goto err;
        }
        gf_uuid_copy (loc.parent->gfid, link_info->pargfid);

        /* lookup file inode */
        if (tier_lookup_file (this, &p_loc, &loc,
                              &current, per_link_status) != 0) {
                goto err;
        }

        if (query_cbk_args->is_promotion) {
                if (!tier_can_promote_file (this,
                                            link_info->file_name,
                                            &current,
                                            defrag)) {
                        *per_link_status = 1;
                        goto err;
                }
        }

        linked_inode = inode_link (loc.inode, NULL, NULL, &current);
        inode_unref (loc.inode);
        loc.inode = linked_inode;


        /*
         * Do not promote/demote if file already is where it
         * should be. It means another brick moved the file
         * so is not an error. So we set per_link_status = 1
         * so that we ignore counting this.
         */
        src_subvol = dht_subvol_get_cached (this, loc.inode);

        if (tier_is_file_already_at_destination (src_subvol, query_cbk_args,
                                                 conf, per_link_status)) {
                goto err;
        }

        gf_msg_debug (this->name, 0, "Tier %s: src_subvol %s file %s",
                      (query_cbk_args->is_promotion ?  "promote" : "demote"),
                      src_subvol->name,
                      loc.path);


        ret = tier_check_same_node (this, &loc, defrag);
        if (ret != 0) {
                if (ret < 0) {
                        *per_link_status = -1;
                        goto err;
                }
                ret = 0;
                /* By setting per_link_status to 1 we are
                 * ignoring this status and will not be counting
                 * this file for migration */
                *per_link_status = 1;
                goto err;
        }

        gf_uuid_copy (loc.gfid, loc.inode->gfid);

        if (gf_defrag_get_pause_state (&defrag->tier_conf) != TIER_RUNNING) {
                gf_msg (this->name, GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Tiering paused. "
                        "Exiting tier_migrate_link");
                goto err;
        }

        ret = tier_migrate (this, query_cbk_args->is_promotion,
                            migrate_data, &loc, &defrag->tier_conf);

        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, -ret,
                        DHT_MSG_LOG_TIER_ERROR, "Failed to "
                        "migrate %s ", loc.path);
                *per_link_status = -1;
                goto err;
        }

        tier_update_migration_counters (query_cbk_args, defrag,
                                        total_migrated_bytes, total_files);

        ret = 0;

err:
        GF_FREE ((char *) loc.name);
        loc.name = NULL;
        loc_wipe (&loc);
        loc_wipe (&p_loc);

        if ((*total_files >= defrag->tier_conf.max_migrate_files)
            || (*total_migrated_bytes > defrag->tier_conf.max_migrate_bytes)) {
                gf_msg (this->name, GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Reached cycle migration limit."
                        "migrated bytes %"PRId64" files %d",
                        *total_migrated_bytes,
                        *total_files);
                ret = -1;
        }

        return ret;
}


static int
tier_migrate_using_query_file (void *_args)
{
        int ret                                 = -1;
        query_cbk_args_t *query_cbk_args        = (query_cbk_args_t *) _args;
        xlator_t *this                          = NULL;
        gf_defrag_info_t *defrag                = NULL;
        gfdb_query_record_t *query_record       = NULL;
        gfdb_link_info_t *link_info             = NULL;
        dict_t *migrate_data                    = NULL;
        /*
         * per_file_status and per_link_status
         *  0  : success
         * -1 : failure
         *  1  : ignore the status and don't count for migration
         * */
        int per_file_status                     = 0;
        int per_link_status                     = 0;
        int total_status                        = 0;
        dht_conf_t   *conf                      = NULL;
        uint64_t total_migrated_bytes           = 0;
        int total_files                         = 0;
        loc_t root_loc                          = { 0 };
        gfdb_time_t  start_time                 = { 0 };
        gfdb_time_t  current_time               = { 0 };
        int total_time                          = 0;
        int max_time                            = 0;
        gf_boolean_t emergency_demote_mode      = _gf_false;


        GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
        GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
        this = query_cbk_args->this;
        GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out);
        GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->qfile_array, out);
        GF_VALIDATE_OR_GOTO (this->name, this->private, out);

        conf = this->private;

        defrag = query_cbk_args->defrag;
        migrate_data = dict_new ();
        if (!migrate_data)
                goto out;

        emergency_demote_mode = (!query_cbk_args->is_promotion &&
                                 is_hot_tier_full(&defrag->tier_conf));

        if (tier_set_migrate_data (migrate_data) != 0) {
                goto out;
        }

        dht_build_root_loc (defrag->root_inode, &root_loc);

        ret = gettimeofday (&start_time, NULL);
        if (query_cbk_args->is_promotion) {
                max_time = defrag->tier_conf.tier_promote_frequency;
        } else {
                max_time = defrag->tier_conf.tier_demote_frequency;
        }

        /* Per file */
        while ((ret = read_query_record_list (query_cbk_args->qfile_array,
              &query_record)) != 0) {

                if (ret < 0) {
                        gf_msg (this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR,
                                "Failed to fetch query record "
                                "from query file");
                        goto out;
                }

                if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
                        ret = -1;
                        gf_msg (this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR,
                                "Exiting tier migration as"
                                "defrag status is not started");
                        goto out;
                }

                ret = gettimeofday (&current_time, NULL);
                if (ret < 0) {
                        gf_msg (this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR,
                                "Could not get current time.");
                        goto out;
                }

                total_time = current_time.tv_sec - start_time.tv_sec;
                if (total_time > max_time) {
                        gf_msg (this->name, GF_LOG_INFO, 0,
                                DHT_MSG_LOG_TIER_STATUS,
                                "Max cycle time reached. Exiting migration.");
                        goto out;
                }

                per_file_status      = 0;
                per_link_status      = 0;

                if (gf_defrag_get_pause_state (&defrag->tier_conf)
                    != TIER_RUNNING) {
                        gf_msg (this->name, GF_LOG_INFO, 0,
                                DHT_MSG_LOG_TIER_STATUS,
                                "Tiering paused. "
                                "Exiting tier_migrate_using_query_file");
                        break;
                }

                if (defrag->tier_conf.mode == TIER_MODE_WM) {
                        ret = tier_get_fs_stat (this, &root_loc);
                        if (ret != 0) {
                                gfdb_methods.gfdb_query_record_free (query_record);
                                query_record = NULL;
                                gf_msg (this->name, GF_LOG_ERROR, 0,
                                        DHT_MSG_LOG_TIER_STATUS,
                                        "tier_get_fs_stat() FAILED ... "
                                        "skipping file migrations until next cycle");
                                break;
                        }

                        if (!tier_do_migration (this, query_cbk_args->is_promotion)) {
                                gfdb_methods.gfdb_query_record_free (query_record);
                                query_record = NULL;

                                /* We have crossed the high watermark. Stop processing
                                 * files if this is a promotion cycle so demotion gets
                                 * a chance to start if not already running*/

                                if (query_cbk_args->is_promotion &&
                                    is_hot_tier_full (&defrag->tier_conf)) {

                                        gf_msg (this->name, GF_LOG_INFO, 0,
                                                DHT_MSG_LOG_TIER_STATUS,
                                                "High watermark crossed during "
                                                "promotion. Exiting "
                                                "tier_migrate_using_query_file");
                                        break;
                                }
                                continue;
                        }
                }

                per_link_status = 0;

                /* For now we only support single link migration. And we will
                 * ignore other hard links in the link info list of query record
                 * TODO: Multiple hard links migration */
                if (!list_empty (&query_record->link_list)) {
                        link_info = list_first_entry (&query_record->link_list,
                                                      gfdb_link_info_t, list);
                }
                if (link_info != NULL) {
                        if (tier_migrate_link (this,
                                               conf,
                                               query_record->gfid,
                                               link_info,
                                               defrag,
                                               query_cbk_args,
                                               migrate_data,
                                               &per_link_status,
                                               &total_files,
                                               &total_migrated_bytes) != 0) {
                                gf_msg (this->name, GF_LOG_INFO, 0,
                                        DHT_MSG_LOG_TIER_STATUS,
                                        "%s failed for %s(gfid:%s)",
                                        (query_cbk_args->is_promotion ?
                                         "Promotion" : "Demotion"),
                                        link_info->file_name,
                                        uuid_utoa (query_record->gfid));
                        }
                }
                per_file_status = per_link_status;

                if (per_file_status < 0) {/* Failure */
                        pthread_mutex_lock (&dm_stat_mutex);
                        defrag->total_failures++;
                        pthread_mutex_unlock (&dm_stat_mutex);
                } else if (per_file_status == 0) {/* Success */
                        pthread_mutex_lock (&dm_stat_mutex);
                        defrag->total_files++;
                        pthread_mutex_unlock (&dm_stat_mutex);
                } else if (per_file_status == 1) {/* Ignore */
                        per_file_status = 0;
                        /* Since this attempt was ignored we
                         * decrement the lookup count*/
                        pthread_mutex_lock (&dm_stat_mutex);
                        defrag->num_files_lookedup--;
                        pthread_mutex_unlock (&dm_stat_mutex);
                }
                total_status = total_status + per_file_status;
                per_link_status = 0;
                per_file_status = 0;

                gfdb_methods.gfdb_query_record_free (query_record);
                query_record = NULL;

                /* If we are demoting and the entry watermark was HI, then
                 * we are done with emergency demotions if the current
                 * watermark has fallen below hi-watermark level
                 */
                if (emergency_demote_mode) {
                        if (tier_check_watermark (this) == 0) {
                                if (!is_hot_tier_full (&defrag->tier_conf)) {
                                        break;
                                }
                        }
                }
        }

out:
        if (migrate_data)
                dict_unref (migrate_data);

        gfdb_methods.gfdb_query_record_free (query_record);
        query_record = NULL;

        return total_status;
}


/* This is the call back function per record/file from data base */
static int
tier_gf_query_callback (gfdb_query_record_t *gfdb_query_record,
                        void *_args) {
        int ret = -1;
        query_cbk_args_t *query_cbk_args = _args;

        GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
        GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->defrag, out);
        GF_VALIDATE_OR_GOTO ("tier", (query_cbk_args->query_fd > 0), out);

        ret = gfdb_methods.gfdb_write_query_record (query_cbk_args->query_fd,
                                                        gfdb_query_record);
        if (ret) {
                gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
                        "Failed writing query record to query file");
                goto out;
        }

        pthread_mutex_lock (&dm_stat_mutex);
        query_cbk_args->defrag->num_files_lookedup++;
        pthread_mutex_unlock (&dm_stat_mutex);

        ret = 0;
out:
        return ret;
}




/* Create query file in tier process */
static int
tier_process_self_query (tier_brick_list_t *local_brick, void *args)
{
        int ret                                         = -1;
        char *db_path                                   = NULL;
        query_cbk_args_t *query_cbk_args                = NULL;
        xlator_t *this                                  = NULL;
        gfdb_conn_node_t *conn_node                     = NULL;
        dict_t *params_dict                             = NULL;
        dict_t *ctr_ipc_dict                            = NULL;
        gfdb_brick_info_t *gfdb_brick_info              = args;

        /*Init of all the essentials*/
        GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_info , out);
        query_cbk_args = gfdb_brick_info->_query_cbk_args;

        GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
        this = query_cbk_args->this;

        GF_VALIDATE_OR_GOTO (this->name,
                             gfdb_brick_info->_query_cbk_args, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick->xlator, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick->brick_db_path, out);

        db_path = local_brick->brick_db_path;

        /*Preparing DB parameters before init_db i.e getting db connection*/
        params_dict = dict_new ();
        if (!params_dict) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "DB Params cannot initialized");
                goto out;
        }
        SET_DB_PARAM_TO_DICT(this->name, params_dict,
                             (char *) gfdb_methods.get_db_path_key(),
                             db_path, ret, out);

        /*Get the db connection*/
        conn_node = gfdb_methods.init_db ((void *)params_dict, dht_tier_db_type);
        if (!conn_node) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "FATAL: Failed initializing db operations");
                         goto out;
        }

        /* Query for eligible files from db */
        query_cbk_args->query_fd = open (local_brick->qfile_path,
                        O_WRONLY | O_CREAT | O_APPEND,
                        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
        if (query_cbk_args->query_fd < 0) {
                gf_msg (this->name, GF_LOG_ERROR, errno,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to open query file %s",
                        local_brick->qfile_path);
                goto out;
        }
        if (!gfdb_brick_info->_gfdb_promote) {
                if (query_cbk_args->defrag->tier_conf.watermark_last ==
                        TIER_WM_HI) {
                        /* emergency demotion mode */
                        ret = gfdb_methods.find_all (conn_node,
                                tier_gf_query_callback,
                                (void *)query_cbk_args,
                                query_cbk_args->defrag->tier_conf.
                                        query_limit);
                } else {
                        if (query_cbk_args->defrag->write_freq_threshold == 0 &&
                            query_cbk_args->defrag->read_freq_threshold == 0) {
                                ret = gfdb_methods.find_unchanged_for_time (
                                                conn_node,
                                                tier_gf_query_callback,
                                                (void *)query_cbk_args,
                                                gfdb_brick_info->time_stamp);
                        } else {
                                ret = gfdb_methods.find_unchanged_for_time_freq (
                                        conn_node,
                                        tier_gf_query_callback,
                                        (void *)query_cbk_args,
                                        gfdb_brick_info->time_stamp,
                                        query_cbk_args->defrag->
                                                        write_freq_threshold,
                                        query_cbk_args->defrag->
                                                        read_freq_threshold,
                                        _gf_false);
                        }
                }
        } else {
                if (query_cbk_args->defrag->write_freq_threshold == 0 &&
                        query_cbk_args->defrag->read_freq_threshold == 0) {
                        ret = gfdb_methods.find_recently_changed_files (
                                conn_node,
                                tier_gf_query_callback,
                                (void *)query_cbk_args,
                                gfdb_brick_info->time_stamp);
                } else {
                        ret = gfdb_methods.find_recently_changed_files_freq (
                                conn_node,
                                tier_gf_query_callback,
                                (void *)query_cbk_args,
                                gfdb_brick_info->time_stamp,
                                query_cbk_args->defrag->write_freq_threshold,
                                query_cbk_args->defrag->read_freq_threshold,
                                _gf_false);
                }
        }
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR,
                                "FATAL: query from db failed");
                        goto out;
        }

        /*Clear the heat on the DB entries*/
        /*Preparing ctr_ipc_dict*/
        ctr_ipc_dict = dict_new ();
        if (!ctr_ipc_dict) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "ctr_ipc_dict cannot initialized");
                goto out;
        }

        SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_dict,
                             GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_CLEAR_OPS,
                             ret, out);

        ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, ctr_ipc_dict,
                                                                        NULL);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR, "Failed clearing the heat "
                        "on db %s error %d", local_brick->brick_db_path, ret);
                goto out;
        }

        ret = 0;
out:
        if (params_dict) {
                dict_unref (params_dict);
                params_dict = NULL;
        }

        if (ctr_ipc_dict) {
                dict_unref (ctr_ipc_dict);
                ctr_ipc_dict = NULL;
        }

        if (query_cbk_args && query_cbk_args->query_fd >= 0) {
                sys_close (query_cbk_args->query_fd);
                query_cbk_args->query_fd = -1;
        }
        gfdb_methods.fini_db (conn_node);

        return ret;
}





/*Ask CTR to create the query file*/
static int
tier_process_ctr_query (tier_brick_list_t *local_brick, void *args)
{
        int ret                                         = -1;
        query_cbk_args_t *query_cbk_args                = NULL;
        xlator_t *this                                  = NULL;
        dict_t *ctr_ipc_in_dict                         = NULL;
        dict_t *ctr_ipc_out_dict                        = NULL;
        gfdb_brick_info_t *gfdb_brick_info              = args;
        gfdb_ipc_ctr_params_t *ipc_ctr_params           = NULL;
        int count                                       = 0;

        /*Init of all the essentials*/
        GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_info , out);
        query_cbk_args = gfdb_brick_info->_query_cbk_args;

        GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
        this = query_cbk_args->this;

        GF_VALIDATE_OR_GOTO (this->name,
                             gfdb_brick_info->_query_cbk_args, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick->xlator, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick->brick_db_path, out);


        /*Preparing ctr_ipc_in_dict*/
        ctr_ipc_in_dict = dict_new ();
        if (!ctr_ipc_in_dict) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "ctr_ipc_in_dict cannot initialized");
                goto out;
        }

        ipc_ctr_params = GF_CALLOC (1, sizeof (gfdb_ipc_ctr_params_t),
                                        gf_tier_mt_ipc_ctr_params_t);
        if (!ipc_ctr_params) {
                goto out;
        }

        /* set all the query params*/
        ipc_ctr_params->is_promote = gfdb_brick_info->_gfdb_promote;

        ipc_ctr_params->write_freq_threshold =
                query_cbk_args->defrag->write_freq_threshold;

        ipc_ctr_params->read_freq_threshold =
                query_cbk_args->defrag->read_freq_threshold;

        ipc_ctr_params->query_limit =
                query_cbk_args->defrag->tier_conf.query_limit;

        ipc_ctr_params->emergency_demote =
                (!gfdb_brick_info->_gfdb_promote &&
                 query_cbk_args->defrag->tier_conf.watermark_last ==
                        TIER_WM_HI);

        memcpy (&ipc_ctr_params->time_stamp,
                gfdb_brick_info->time_stamp,
                sizeof (gfdb_time_t));

        SET_DB_PARAM_TO_DICT (this->name, ctr_ipc_in_dict,
                              GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_QUERY_OPS,
                              ret, out);


        SET_DB_PARAM_TO_DICT (this->name, ctr_ipc_in_dict,
                              GFDB_IPC_CTR_GET_QFILE_PATH,
                              local_brick->qfile_path,
                              ret, out);

        ret = dict_set_bin (ctr_ipc_in_dict, GFDB_IPC_CTR_GET_QUERY_PARAMS,
                                ipc_ctr_params, sizeof (*ipc_ctr_params));
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
                        "Failed setting %s to params dictionary",
                        GFDB_IPC_CTR_GET_QUERY_PARAMS);
                GF_FREE (ipc_ctr_params);
                goto out;
        }
        ipc_ctr_params = NULL;

        ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR,
                                ctr_ipc_in_dict, &ctr_ipc_out_dict);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_IPC_TIER_ERROR, "Failed query on %s ret %d",
                        local_brick->brick_db_path, ret);
                goto out;
        }

        ret = dict_get_int32(ctr_ipc_out_dict, GFDB_IPC_CTR_RET_QUERY_COUNT,
                                &count);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR, "Failed getting count "
                        "of records on %s",
                        local_brick->brick_db_path);
                goto out;
        }

        if (count < 0) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR, "Failed query on %s",
                        local_brick->brick_db_path);
                ret = -1;
                goto out;
        }

        pthread_mutex_lock (&dm_stat_mutex);
        query_cbk_args->defrag->num_files_lookedup = count;
        pthread_mutex_unlock (&dm_stat_mutex);

        ret = 0;
out:

        if (ctr_ipc_in_dict) {
                dict_unref(ctr_ipc_in_dict);
                ctr_ipc_in_dict = NULL;
        }

        if (ctr_ipc_out_dict) {
                dict_unref(ctr_ipc_out_dict);
                ctr_ipc_out_dict = NULL;
        }

        GF_FREE (ipc_ctr_params);

        return ret;
}




/* This is the call back function for each brick from hot/cold bricklist
 * It picks up each bricks db and queries for eligible files for migration.
 * The list of eligible files are populated in appropriate query files*/
static int
tier_process_brick (tier_brick_list_t *local_brick, void *args) {
        int ret = -1;
        dict_t *ctr_ipc_in_dict = NULL;
        dict_t *ctr_ipc_out_dict = NULL;
        char *strval = NULL;

        GF_VALIDATE_OR_GOTO ("tier", local_brick, out);

        GF_VALIDATE_OR_GOTO ("tier", local_brick->xlator, out);

        if (dht_tier_db_type == GFDB_SQLITE3) {

                /*Preparing ctr_ipc_in_dict*/
                ctr_ipc_in_dict = dict_new ();
                if (!ctr_ipc_in_dict) {
                        gf_msg ("tier", GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR,
                                "ctr_ipc_in_dict cannot initialized");
                        goto out;
                }

                ret = dict_set_str (ctr_ipc_in_dict, GFDB_IPC_CTR_KEY,
                                    GFDB_IPC_CTR_GET_DB_PARAM_OPS);
                if (ret) {
                        gf_msg ("tier", GF_LOG_ERROR, 0,\
                                LG_MSG_SET_PARAM_FAILED, "Failed to set %s "
                                "to params dictionary", GFDB_IPC_CTR_KEY);
                        goto out;
                }

                ret = dict_set_str (ctr_ipc_in_dict,
                                    GFDB_IPC_CTR_GET_DB_PARAM_OPS, "");
                if (ret) {
                        gf_msg ("tier", GF_LOG_ERROR, 0,\
                                LG_MSG_SET_PARAM_FAILED, "Failed to set %s "
                                "to params dictionary",
                                GFDB_IPC_CTR_GET_DB_PARAM_OPS);
                        goto out;
                }

                ret = dict_set_str (ctr_ipc_in_dict,
                                    GFDB_IPC_CTR_GET_DB_KEY, "journal_mode");
                if (ret) {
                        gf_msg ("tier", GF_LOG_ERROR, 0,
                                LG_MSG_SET_PARAM_FAILED, "Failed to set %s "
                                "to params dictionary",
                                GFDB_IPC_CTR_GET_DB_KEY);
                        goto out;
                }



                ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR,
                                ctr_ipc_in_dict, &ctr_ipc_out_dict);
                if (ret || ctr_ipc_out_dict == NULL) {
                        gf_msg ("tier", GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR, "Failed to get "
                                "journal_mode of sql db %s",
                                local_brick->brick_db_path);
                        goto out;
                }

                ret = dict_get_str (ctr_ipc_out_dict, "journal_mode", &strval);
                if (ret) {
                        gf_msg ("tier", GF_LOG_ERROR, 0,
                                LG_MSG_GET_PARAM_FAILED, "Failed to get %s "
                                "from params dictionary"
                                "journal_mode", strval);
                        goto out;
                }

                if (strval && (strncmp(strval, "wal", strlen ("wal")) == 0)) {
                        ret = tier_process_self_query (local_brick, args);
                        if (ret) {
                                goto out;
                        }
                } else {
                        ret = tier_process_ctr_query (local_brick, args);
                        if (ret) {
                                goto out;
                        }
                }
                ret = 0;

        } else {
                ret = tier_process_self_query (local_brick, args);
                if (ret) {
                        goto out;
                }
        }

        ret = 0;
out:
        if  (ctr_ipc_in_dict)
                dict_unref (ctr_ipc_in_dict);

        if  (ctr_ipc_out_dict)
                dict_unref (ctr_ipc_out_dict);

        return ret;
}




static int
tier_build_migration_qfile (migration_args_t *args,
                            query_cbk_args_t *query_cbk_args,
                            gf_boolean_t is_promotion)
{
        gfdb_time_t               current_time;
        gfdb_brick_info_t         gfdb_brick_info;
        gfdb_time_t               time_in_past;
        int                       ret = -1;
        tier_brick_list_t         *local_brick = NULL;
        int                       i = 0;
        time_in_past.tv_sec = args->freq_time;
        time_in_past.tv_usec = 0;

        ret = gettimeofday (&current_time, NULL);
        if (ret == -1) {
                gf_msg (args->this->name, GF_LOG_ERROR, errno,
                        DHT_MSG_SYS_CALL_GET_TIME_FAILED,
                        "Failed to get current time");
                goto out;
        }
        time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;

        /* The migration daemon may run a varying numberof usec after the */
        /* sleep call triggers. A file may be registered in CTR some number */
        /* of usec X after the daemon started and missed in the subsequent */
        /* cycle if the daemon starts Y usec after the period in seconds */
        /* where Y>X. Normalize away this problem by always setting usec */
        /* to 0. */
        time_in_past.tv_usec = 0;

        gfdb_brick_info.time_stamp = &time_in_past;
        gfdb_brick_info._gfdb_promote = is_promotion;
        gfdb_brick_info._query_cbk_args = query_cbk_args;

        list_for_each_entry (local_brick, args->brick_list, list) {

                /* Construct query file path for this brick
                 * i.e
                 * /var/run/gluster/xlator_name/
                 * {promote/demote}-brickname-indexinbricklist
                 * So that no two query files will have same path even
                 * bricks have the same name
                 * */
                snprintf (local_brick->qfile_path, PATH_MAX , "%s-%s-%d",
                        GET_QFILE_PATH (gfdb_brick_info._gfdb_promote),
                        local_brick->brick_name, i);

                /* Delete any old query files for this brick */
                sys_unlink (local_brick->qfile_path);

                ret = tier_process_brick (local_brick,
                                          &gfdb_brick_info);
                if (ret) {
                        gf_msg (args->this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_BRICK_QUERY_FAILED,
                                "Brick %s query failed\n",
                                local_brick->brick_db_path);
                }
                i++;
        }
        ret = 0;
out:
        return ret;
}

static int
tier_migrate_files_using_qfile (migration_args_t *comp,
                                query_cbk_args_t *query_cbk_args)
{
        int ret                                 = -1;
        tier_brick_list_t *local_brick          = NULL;
        tier_brick_list_t *temp                 = NULL;
        gfdb_time_t current_time                = {0,};
        ssize_t qfile_array_size                = 0;
        int count                               = 0;
        int temp_fd                             = 0;
        gf_tier_conf_t  *tier_conf              = NULL;

        tier_conf = &(query_cbk_args->defrag->tier_conf);

        /* Time for error query files */
        gettimeofday (&current_time, NULL);

        /* Build the qfile list */
        list_for_each_entry_safe (local_brick, temp, comp->brick_list, list) {
                qfile_array_size++;
        }
        query_cbk_args->qfile_array = qfile_array_new (qfile_array_size);
        if (!query_cbk_args->qfile_array) {
                gf_msg ("tier", GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR, "Failed to create new "
                        "qfile_array");
                goto out;
        }

        /*Open all qfiles*/
        count = 0;
        query_cbk_args->qfile_array->exhausted_count = 0;
        list_for_each_entry_safe (local_brick, temp, comp->brick_list, list) {
                temp_fd = query_cbk_args->qfile_array->fd_array[count];
                temp_fd = open (local_brick->qfile_path, O_RDONLY,
                                S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
                if (temp_fd < 0) {
                        gf_msg ("tier", GF_LOG_ERROR, errno,
                                DHT_MSG_LOG_TIER_ERROR, "Failed to open "
                                "%s to the query file",
                                local_brick->qfile_path);
                        query_cbk_args->qfile_array->exhausted_count++;
                }
                query_cbk_args->qfile_array->fd_array[count] = temp_fd;
                count++;
        }

        /* Moving the query file index to the next, so that we won't the same
         * query file every cycle as the first one */
        query_cbk_args->qfile_array->next_index =
                        (query_cbk_args->is_promotion) ?
                        tier_conf->last_promote_qfile_index :
                        tier_conf->last_demote_qfile_index;
        shift_next_index (query_cbk_args->qfile_array);
        if (query_cbk_args->is_promotion) {
                tier_conf->last_promote_qfile_index =
                        query_cbk_args->qfile_array->next_index;
        } else {
                tier_conf->last_demote_qfile_index =
                        query_cbk_args->qfile_array->next_index;
        }

        /* Migrate files using query file list */
        ret = tier_migrate_using_query_file ((void *)query_cbk_args);
out:
        qfile_array_free (query_cbk_args->qfile_array);

        /* If there is an error rename all the query files to .err files
         * with a timestamp for better debugging */
        if (ret) {
                struct tm tm                            = {0,};
                char time_str[128]                      = {0,};
                char query_file_path_err[PATH_MAX]      = {0,};
                int32_t len                             = 0;

                /* Time format for error query files */
                gmtime_r (&current_time.tv_sec, &tm);
                strftime (time_str, sizeof (time_str), "%F-%T", &tm);

                list_for_each_entry_safe (local_brick, temp, comp->brick_list,
                                        list) {
                        /* rename error qfile*/
                        len = snprintf (query_file_path_err,
                                        sizeof (query_file_path_err),
                                        "%s-%s.err", local_brick->qfile_path,
                                        time_str);
                        if ((len >= 0) &&
                            (len < sizeof(query_file_path_err))) {
                                if (sys_rename (local_brick->qfile_path,
                                                query_file_path_err) == -1)
                                        gf_msg_debug ("tier", 0, "rename "
                                                      "failed");
                        }
                }
        }

        query_cbk_args->qfile_array = NULL;

        return ret;
}



int
tier_demote (migration_args_t *demotion_args)
{
        query_cbk_args_t query_cbk_args;
        int ret = -1;

        GF_VALIDATE_OR_GOTO ("tier", demotion_args, out);
        GF_VALIDATE_OR_GOTO ("tier", demotion_args->this, out);
        GF_VALIDATE_OR_GOTO (demotion_args->this->name,
                             demotion_args->brick_list, out);
        GF_VALIDATE_OR_GOTO (demotion_args->this->name,
                             demotion_args->defrag, out);

        THIS = demotion_args->this;

        query_cbk_args.this = demotion_args->this;
        query_cbk_args.defrag = demotion_args->defrag;
        query_cbk_args.is_promotion = 0;

        /*Build the query file using bricklist*/
        ret = tier_build_migration_qfile (demotion_args, &query_cbk_args,
                                          _gf_false);
        if (ret)
                goto out;

        /* Migrate files using the query file */
        ret = tier_migrate_files_using_qfile (demotion_args,
                                              &query_cbk_args);
        if (ret)
                goto out;

out:
        demotion_args->return_value = ret;
        return ret;
}


int
tier_promote (migration_args_t *promotion_args)
{
        int ret = -1;
        query_cbk_args_t query_cbk_args;

        GF_VALIDATE_OR_GOTO ("tier", promotion_args->this, out);
        GF_VALIDATE_OR_GOTO (promotion_args->this->name,
                             promotion_args->brick_list, out);
        GF_VALIDATE_OR_GOTO (promotion_args->this->name,
                             promotion_args->defrag, out);

        THIS = promotion_args->this;

        query_cbk_args.this = promotion_args->this;
        query_cbk_args.defrag = promotion_args->defrag;
        query_cbk_args.is_promotion = 1;

        /*Build the query file using bricklist*/
        ret = tier_build_migration_qfile (promotion_args, &query_cbk_args,
                                          _gf_true);
        if (ret)
                goto out;

        /* Migrate files using the query file */
        ret = tier_migrate_files_using_qfile (promotion_args, &query_cbk_args);
        if (ret)
                goto out;

out:
        promotion_args->return_value = ret;
        return ret;
}


/*
 * Command the CTR on a brick to compact the local database using an IPC
 */
static int
tier_process_self_compact (tier_brick_list_t *local_brick, void *args)
{
        int ret                                         = -1;
        char *db_path                                   = NULL;
        query_cbk_args_t *query_cbk_args                = NULL;
        xlator_t *this                                  = NULL;
        gfdb_conn_node_t *conn_node                     = NULL;
        dict_t *params_dict                             = NULL;
        dict_t *ctr_ipc_dict                            = NULL;
        gfdb_brick_info_t *gfdb_brick_info              = args;

        /*Init of all the essentials*/
        GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_info , out);
        query_cbk_args = gfdb_brick_info->_query_cbk_args;

        GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
        this = query_cbk_args->this;

        GF_VALIDATE_OR_GOTO (this->name,
                             gfdb_brick_info->_query_cbk_args, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick->xlator, out);

        GF_VALIDATE_OR_GOTO (this->name, local_brick->brick_db_path, out);

        db_path = local_brick->brick_db_path;

        /*Preparing DB parameters before init_db i.e getting db connection*/
        params_dict = dict_new ();
        if (!params_dict) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "DB Params cannot initialized");
                goto out;
        }
        SET_DB_PARAM_TO_DICT (this->name, params_dict,
                             (char *) gfdb_methods.get_db_path_key(), db_path,
                             ret, out);

        /*Get the db connection*/
        conn_node = gfdb_methods.init_db ((void *)params_dict,
                                          dht_tier_db_type);
        if (!conn_node) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "FATAL: Failed initializing db operations");
                         goto out;
        }

        ret = 0;

        /*Preparing ctr_ipc_dict*/
        ctr_ipc_dict = dict_new ();
        if (!ctr_ipc_dict) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "ctr_ipc_dict cannot initialized");
                goto out;
        }

        ret = dict_set_int32 (ctr_ipc_dict, "compact_active",
                              query_cbk_args->defrag->
                              tier_conf.compact_active);

        if (ret) {
                gf_msg ("tier", GF_LOG_ERROR, 0,
                        LG_MSG_SET_PARAM_FAILED, "Failed to set %s "
                        "to params dictionary",
                        "compact_active");
                goto out;
        }

        ret = dict_set_int32 (ctr_ipc_dict, "compact_mode_switched",
                              query_cbk_args->defrag->
                              tier_conf.compact_mode_switched);

        if (ret) {
                gf_msg ("tier", GF_LOG_ERROR, 0,
                        LG_MSG_SET_PARAM_FAILED, "Failed to set %s "
                        "to params dictionary",
                        "compact_mode_switched");
                goto out;
        }

        SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_dict,
                             GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_SET_COMPACT_PRAGMA,
                             ret, out);

        gf_msg (this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
                "Starting Compaction IPC");

        ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, ctr_ipc_dict,
                          NULL);

        gf_msg (this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
                "Ending Compaction IPC");

        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR, "Failed compaction "
                        "on db %s error %d", local_brick->brick_db_path, ret);
                goto out;
        }

        gf_msg (this->name, GF_LOG_TRACE, 0, DHT_MSG_LOG_TIER_STATUS,
                "SUCCESS: %s Compaction", local_brick->brick_name);

        ret = 0;
out:
        if (params_dict) {
                dict_unref (params_dict);
                params_dict = NULL;
        }

        if (ctr_ipc_dict) {
                dict_unref (ctr_ipc_dict);
                ctr_ipc_dict = NULL;
        }

        gfdb_methods.fini_db (conn_node);

        return ret;
}

/*
 * This is the call back function for each brick from hot/cold bricklist.
 * It determines the database type on each brick and calls the corresponding
 * function to prepare the compaction IPC.
 */
static int
tier_compact_db_brick (tier_brick_list_t *local_brick, void *args)
{
        int ret = -1;

        GF_VALIDATE_OR_GOTO ("tier", local_brick, out);

        GF_VALIDATE_OR_GOTO ("tier", local_brick->xlator, out);

        ret = tier_process_self_compact (local_brick, args);
        if (ret) {
                gf_msg ("tier", GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Brick %s did not compact",
                        local_brick->brick_name);
                goto out;
        }

        ret = 0;

out:

        return ret;
}

static int
tier_send_compact (migration_args_t *args,
                   query_cbk_args_t *query_cbk_args)
{
        gfdb_time_t               current_time;
        gfdb_brick_info_t         gfdb_brick_info;
        gfdb_time_t               time_in_past;
        int                       ret = -1;
        tier_brick_list_t         *local_brick = NULL;

        time_in_past.tv_sec = args->freq_time;
        time_in_past.tv_usec = 0;

        ret = gettimeofday (&current_time, NULL);
        if (ret == -1) {
                gf_msg (args->this->name, GF_LOG_ERROR, errno,
                        DHT_MSG_SYS_CALL_GET_TIME_FAILED,
                        "Failed to get current time");
                goto out;
        }
        time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;

        /* The migration daemon may run a varying numberof usec after the sleep
           call triggers. A file may be registered in CTR some number of usec X
           after the daemon started and missed in the subsequent cycle if the
           daemon starts Y usec after the period in seconds where Y>X. Normalize
           away this problem by always setting usec to 0. */
        time_in_past.tv_usec = 0;

        gfdb_brick_info.time_stamp = &time_in_past;

        /* This is meant to say we are always compacting at this point */
        /* We simply borrow the promotion flag to do this */
        gfdb_brick_info._gfdb_promote = 1;

        gfdb_brick_info._query_cbk_args = query_cbk_args;

        list_for_each_entry (local_brick, args->brick_list, list) {

                gf_msg (args->this->name, GF_LOG_TRACE, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Start compaction for %s",
                        local_brick->brick_name);

                ret = tier_compact_db_brick (local_brick,
                                             &gfdb_brick_info);
                if (ret) {
                        gf_msg (args->this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_BRICK_QUERY_FAILED,
                                "Brick %s compaction failed\n",
                                local_brick->brick_db_path);
                }

                gf_msg (args->this->name, GF_LOG_TRACE, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "End compaction for %s",
                        local_brick->brick_name);

        }
        ret = 0;
out:
        return ret;
}

static int
tier_compact (void *args)
{
        int ret = -1;
        query_cbk_args_t query_cbk_args;
        migration_args_t *compaction_args = args;

        GF_VALIDATE_OR_GOTO ("tier", compaction_args->this, out);
        GF_VALIDATE_OR_GOTO (compaction_args->this->name,
                             compaction_args->brick_list, out);
        GF_VALIDATE_OR_GOTO (compaction_args->this->name,
                             compaction_args->defrag, out);

        THIS = compaction_args->this;

        query_cbk_args.this = compaction_args->this;
        query_cbk_args.defrag = compaction_args->defrag;
        query_cbk_args.is_compaction = 1;

        /* Send the compaction pragma out to all the bricks on the bricklist. */
        /* tier_get_bricklist ensures all bricks on the list are local to */
        /* this node. */
        ret = tier_send_compact (compaction_args, &query_cbk_args);
        if (ret)
                goto out;

        ret = 0;
out:
        compaction_args->return_value = ret;
        return ret;
 }

static int
tier_get_bricklist (xlator_t *xl, struct list_head *local_bricklist_head)
{
        xlator_list_t           *child = NULL;
        char                    *rv        = NULL;
        char                    *rh        = NULL;
        char                    *brickname = NULL;
        char                    db_name[PATH_MAX] = "";
        int                     ret = 0;
        tier_brick_list_t       *local_brick = NULL;
        int32_t                 len = 0;

        GF_VALIDATE_OR_GOTO ("tier", xl, out);
        GF_VALIDATE_OR_GOTO ("tier", local_bricklist_head, out);

        /*
         * This function obtains remote subvolumes and filters out only
         * those running on the same node as the tier daemon.
         */
        if (strcmp(xl->type, "protocol/client") == 0) {
                ret = dict_get_str (xl->options, "remote-host", &rh);
                if (ret < 0)
                        goto out;

                if (gf_is_local_addr (rh)) {

                       local_brick = GF_CALLOC (1, sizeof(tier_brick_list_t),
                                                gf_tier_mt_bricklist_t);
                        if (!local_brick) {
                                goto out;
                        }

                        ret = dict_get_str (xl->options, "remote-subvolume",
                                           &rv);
                        if (ret < 0)
                                goto out;

                        brickname = strrchr(rv, '/') + 1;
                        snprintf(db_name, sizeof(db_name), "%s.db",
                                 brickname);

                        local_brick->brick_db_path =
                                GF_CALLOC (PATH_MAX, 1, gf_common_mt_char);
                        if (!local_brick->brick_db_path) {
                                gf_msg ("tier", GF_LOG_ERROR, 0,
                                        DHT_MSG_LOG_TIER_STATUS,
                                        "Failed to allocate memory for"
                                        " bricklist.");
                                ret = -1;
                                goto out;
                        }

                        len = snprintf(local_brick->brick_db_path,
                                       PATH_MAX, "%s/%s/%s", rv,
                                       GF_HIDDEN_PATH, db_name);
                        if ((len < 0) || (len >= PATH_MAX)) {
                                gf_msg ("tier", GF_LOG_ERROR, EINVAL,
                                        DHT_MSG_LOG_TIER_STATUS,
                                        "DB path too long");
                                ret = -1;
                                goto out;
                        }

                        local_brick->xlator = xl;

                        snprintf (local_brick->brick_name,
                                NAME_MAX, "%s", brickname);

                        list_add_tail (&(local_brick->list),
                                        local_bricklist_head);

                        ret = 0;
                        goto out;
                }
        }

        for (child = xl->children; child; child = child->next) {
                ret = tier_get_bricklist (child->xlator, local_bricklist_head);
                if (ret) {
                        goto out;
                }
        }

        ret = 0;
out:

        if (ret) {
                if (local_brick) {
                        GF_FREE (local_brick->brick_db_path);
                }
                GF_FREE (local_brick);
        }

        return ret;
}

int
tier_get_freq_demote (gf_tier_conf_t *tier_conf)
{
        if ((tier_conf->mode == TIER_MODE_WM) &&
            (tier_conf->watermark_last == TIER_WM_HI))
                return DEFAULT_DEMOTE_DEGRADED;
        else
                return tier_conf->tier_demote_frequency;
}

int
tier_get_freq_promote (gf_tier_conf_t *tier_conf)
{
        return tier_conf->tier_promote_frequency;
}

int
tier_get_freq_compact_hot (gf_tier_conf_t *tier_conf)
{
        return tier_conf->tier_compact_hot_frequency;
}

int
tier_get_freq_compact_cold (gf_tier_conf_t *tier_conf)
{
        return tier_conf->tier_compact_cold_frequency;
}

static int
tier_check_demote (gfdb_time_t  current_time, int freq)
{
        return ((current_time.tv_sec % freq) == 0) ?
                _gf_true : _gf_false;
}

static gf_boolean_t
tier_check_promote (gf_tier_conf_t   *tier_conf,
                    gfdb_time_t  current_time,
                    int freq)
{
        if ((tier_conf->mode == TIER_MODE_WM) &&
            (tier_conf->watermark_last == TIER_WM_HI))
                return _gf_false;

        else
                return ((current_time.tv_sec % freq) == 0) ?
                        _gf_true : _gf_false;
}

static gf_boolean_t
tier_check_compact (gf_tier_conf_t   *tier_conf,
                    gfdb_time_t  current_time,
                    int freq_compact)
{

        if (!(tier_conf->compact_active ||
              tier_conf->compact_mode_switched))
                return _gf_false;


        return ((current_time.tv_sec % freq_compact) == 0) ?
                _gf_true : _gf_false;
}


void
clear_bricklist (struct list_head *brick_list)
{
        tier_brick_list_t  *local_brick      = NULL;
        tier_brick_list_t  *temp             = NULL;

        if (list_empty(brick_list)) {
                return;
        }

        list_for_each_entry_safe (local_brick, temp, brick_list, list) {
                list_del (&local_brick->list);
                GF_FREE (local_brick->brick_db_path);
                GF_FREE (local_brick);
        }
}


static void
set_brick_list_qpath (struct list_head *brick_list, gf_boolean_t is_cold)
{

        tier_brick_list_t *local_brick = NULL;
        int i                          = 0;

        GF_VALIDATE_OR_GOTO ("tier", brick_list, out);

        list_for_each_entry (local_brick, brick_list, list) {

                /* Construct query file path for this brick
                 * i.e
                 * /var/run/gluster/xlator_name/
                 * {promote/demote}-brickname-indexinbricklist
                 * So that no two query files will have same path even
                 * bricks have the same name
                 * */
                snprintf (local_brick->qfile_path, PATH_MAX , "%s-%s-%d",
                        GET_QFILE_PATH (is_cold),
                        local_brick->brick_name, i);
                i++;
        }
out:
        return;
}

static int
tier_prepare_compact (migration_args_t *args, gfdb_time_t current_time)
{
        xlator_t *this = NULL;
        dht_conf_t *conf                        = NULL;
        gf_defrag_info_t *defrag                = NULL;
        gf_tier_conf_t *tier_conf               = NULL;
        gf_boolean_t is_hot_tier = args->is_hot_tier;
        int freq = 0;
        int ret = -1;
        const char *tier_type = is_hot_tier ? "hot" : "cold";

        this = args->this;

        conf = this->private;

        defrag = conf->defrag;

        tier_conf = &defrag->tier_conf;

        freq = is_hot_tier ? tier_get_freq_compact_hot (tier_conf) :
                tier_get_freq_compact_cold (tier_conf);

        defrag->tier_conf.compact_mode_switched = is_hot_tier ?
                defrag->tier_conf.compact_mode_switched_hot :
                defrag->tier_conf.compact_mode_switched_cold;

        gf_msg(this->name, GF_LOG_TRACE, 0,
               DHT_MSG_LOG_TIER_STATUS,
               "Compact mode %i",
               defrag->tier_conf.compact_mode_switched);

        if (tier_check_compact (tier_conf, current_time,
                                freq)) {
                gf_msg (this->name, GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_STATUS,
                        "Start compaction on %s tier",
                        tier_type);

                args->freq_time = freq;
                ret = tier_compact (args);
                if (ret) {
                        gf_msg (this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR, "Compaction failed on "
                                "%s tier", tier_type);
                        goto out;
                }

                gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                        "End compaction on %s tier", tier_type);

                if (is_hot_tier) {
                        defrag->tier_conf.compact_mode_switched_hot =
                                _gf_false;
                } else {
                        defrag->tier_conf.compact_mode_switched_cold =
                                _gf_false;
                }
        }

out:
        return ret;
}

static int
tier_get_wm_interval(tier_mode_t mode, tier_watermark_op_t wm)
{
        if (mode == TIER_MODE_WM && wm == TIER_WM_HI)
                return WM_INTERVAL_EMERG;

        return WM_INTERVAL;
}

/*
 * Main tiering loop. This is called from the promotion and the
 * demotion threads spawned in tier_start().
 *
 * Every second, wake from sleep to perform tasks.
 * 1. Check trigger to migrate data.
 * 2. Check for state changes (pause, unpause, stop).
 */
static void
*tier_run (void *in_args)
{
        dht_conf_t *conf                        = NULL;
        gfdb_time_t  current_time               = { 0 };
        int freq                                = 0;
        int ret                                 = 0;
        xlator_t *any                           = NULL;
        xlator_t *xlator                        = NULL;
        gf_tier_conf_t *tier_conf               = NULL;
        loc_t root_loc                          = { 0 };
        int check_watermark                     = 0;
        gf_defrag_info_t *defrag                = NULL;
        xlator_t  *this                         = NULL;
        migration_args_t *args = in_args;
        GF_VALIDATE_OR_GOTO ("tier", args, out);
        GF_VALIDATE_OR_GOTO ("tier", args->brick_list, out);

        this = args->this;
        GF_VALIDATE_OR_GOTO ("tier", this, out);

        conf   = this->private;
        GF_VALIDATE_OR_GOTO ("tier", conf, out);

        defrag = conf->defrag;
        GF_VALIDATE_OR_GOTO ("tier", defrag, out);

        if (list_empty (args->brick_list)) {
                gf_msg (this->name, GF_LOG_INFO, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Brick list for tier is empty. Exiting.");
                goto out;
        }

        defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
        tier_conf = &defrag->tier_conf;

        dht_build_root_loc (defrag->root_inode, &root_loc);

        while (1) {

                /*
                 * Check if a graph switch occurred. If so, stop migration
                 * thread. It will need to be restarted manually.
                 */
                any = THIS->ctx->active->first;
                xlator = xlator_search_by_name (any, this->name);

                if (xlator != this) {
                        gf_msg (this->name, GF_LOG_INFO, 0,
                                DHT_MSG_LOG_TIER_STATUS,
                                "Detected graph switch. Exiting migration "
                                "daemon.");
                        goto out;
                }

                gf_defrag_check_pause_tier (tier_conf);

                sleep(1);

                if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
                        ret = 1;
                        gf_msg (this->name, GF_LOG_ERROR, 0,
                                DHT_MSG_LOG_TIER_ERROR,
                                "defrag->defrag_status != "
                                "GF_DEFRAG_STATUS_STARTED");
                        goto out;
                }

                if (defrag->cmd == GF_DEFRAG_CMD_START_DETACH_TIER ||
                    defrag->cmd == GF_DEFRAG_CMD_DETACH_START) {
                        ret = 0;
                        defrag->defrag_status =
                                GF_DEFRAG_STATUS_COMPLETE;
                        gf_msg (this->name, GF_LOG_DEBUG, 0,
                                DHT_MSG_LOG_TIER_ERROR,
                                "defrag->defrag_cmd == "
                                "GF_DEFRAG_CMD_START_DETACH_TIER");
                        goto out;
                }

                if (gf_defrag_get_pause_state (&defrag->tier_conf) !=
                    TIER_RUNNING)
                        continue;

                /* To have proper synchronization amongst all
                 * brick holding nodes, so that promotion and demotions
                 * start atomically w.r.t promotion/demotion frequency
                 * period, all nodes should have their system time
                 * in-sync with each other either manually set or
                 * using a NTP server*/
                ret = gettimeofday (&current_time, NULL);
                if (ret == -1) {
                        gf_msg (this->name, GF_LOG_ERROR, errno,
                                DHT_MSG_SYS_CALL_GET_TIME_FAILED,
                                "Failed to get current time");
                        goto out;
                }

                check_watermark++;

                /* emergency demotion requires frequent watermark monitoring */
                if (check_watermark >=
                        tier_get_wm_interval(tier_conf->mode,
                                             tier_conf->watermark_last)) {
                        check_watermark = 0;
                        if (tier_conf->mode == TIER_MODE_WM) {
                                ret = tier_get_fs_stat (this, &root_loc);
                                if (ret != 0) {
                                        continue;
                                }
                                ret = tier_check_watermark (this);
                                if (ret != 0) {
                                        gf_msg (this->name, GF_LOG_CRITICAL, errno,
                                                DHT_MSG_LOG_TIER_ERROR,
                                                "Failed to get watermark");
                                        continue;
                                }
                        }
                }

                if (args->is_promotion) {
                        freq = tier_get_freq_promote (tier_conf);

                        if (tier_check_promote (tier_conf, current_time, freq)) {
                                args->freq_time = freq;
                                ret = tier_promote (args);
                                if (ret) {
                                        gf_msg (this->name, GF_LOG_ERROR, 0,
                                                DHT_MSG_LOG_TIER_ERROR,
                                                "Promotion failed");
                                }
                        }
                } else if (args->is_compaction) {
                        tier_prepare_compact (args, current_time);
                } else {
                        freq = tier_get_freq_demote (tier_conf);

                        if (tier_check_demote (current_time, freq)) {
                                args->freq_time = freq;
                                ret = tier_demote (args);
                                if (ret) {
                                        gf_msg (this->name,
                                                GF_LOG_ERROR,
                                                0,
                                                DHT_MSG_LOG_TIER_ERROR,
                                                "Demotion failed");
                                }
                        }
                }

                /* Check the statfs immediately after the processing threads
                   return */
                check_watermark = WM_INTERVAL;
        }

        ret = 0;
out:

        args->return_value = ret;

        return NULL;
}

int
tier_start (xlator_t *this, gf_defrag_info_t *defrag)
{
        pthread_t promote_thread;
        pthread_t demote_thread;
        pthread_t hot_compact_thread;
        pthread_t cold_compact_thread;
        int ret = -1;
        struct list_head bricklist_hot          = { 0 };
        struct list_head bricklist_cold         = { 0 };
        migration_args_t promotion_args         = { 0 };
        migration_args_t demotion_args          = { 0 };
        migration_args_t hot_compaction_args    = { 0 };
        migration_args_t cold_compaction_args   = { 0 };
        dht_conf_t *conf                        = NULL;

        INIT_LIST_HEAD ((&bricklist_hot));
        INIT_LIST_HEAD ((&bricklist_cold));

        conf   = this->private;

        tier_get_bricklist (conf->subvolumes[1], &bricklist_hot);
        set_brick_list_qpath (&bricklist_hot, _gf_false);

        demotion_args.this = this;
        demotion_args.brick_list = &bricklist_hot;
        demotion_args.defrag = defrag;
        demotion_args.is_promotion = _gf_false;
        demotion_args.is_compaction = _gf_false;

        ret = gf_thread_create (&demote_thread,
                                NULL, &tier_run, &demotion_args, "tierdem");
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to start demotion thread.");
                defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
                goto cleanup;
        }

        tier_get_bricklist (conf->subvolumes[0], &bricklist_cold);
        set_brick_list_qpath (&bricklist_cold, _gf_true);

        promotion_args.this = this;
        promotion_args.brick_list = &bricklist_cold;
        promotion_args.defrag = defrag;
        promotion_args.is_promotion = _gf_true;

        ret = gf_thread_create (&promote_thread, NULL, &tier_run,
                                &promotion_args, "tierpro");
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to start promotion thread.");
                defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
                goto waitforspawned;
        }

        hot_compaction_args.this = this;
        hot_compaction_args.brick_list = &bricklist_hot;
        hot_compaction_args.defrag = defrag;
        hot_compaction_args.is_promotion = _gf_false;
        hot_compaction_args.is_compaction = _gf_true;
        hot_compaction_args.is_hot_tier = _gf_true;

        ret = gf_thread_create (&hot_compact_thread, NULL, &tier_run,
                                &hot_compaction_args, "tierhcom");
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to start compaction thread.");
                defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
                goto waitforspawnedpromote;
        }

        cold_compaction_args.this = this;
        cold_compaction_args.brick_list = &bricklist_cold;
        cold_compaction_args.defrag = defrag;
        cold_compaction_args.is_promotion = _gf_false;
        cold_compaction_args.is_compaction = _gf_true;
        cold_compaction_args.is_hot_tier = _gf_false;

        ret = gf_thread_create (&cold_compact_thread, NULL, &tier_run,
                                &cold_compaction_args, "tierccom");
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Failed to start compaction thread.");
                defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
                goto waitforspawnedhotcompact;
        }
        pthread_join (cold_compact_thread, NULL);

waitforspawnedhotcompact:
        pthread_join (hot_compact_thread, NULL);

waitforspawnedpromote:
        pthread_join (promote_thread, NULL);

waitforspawned:
        pthread_join (demote_thread, NULL);

cleanup:
        clear_bricklist (&bricklist_cold);
        clear_bricklist (&bricklist_hot);
        return ret;
}

int32_t
tier_migration_needed (xlator_t *this)
{
        gf_defrag_info_t        *defrag = NULL;
        dht_conf_t              *conf   = NULL;
        int                      ret = 0;

        conf = this->private;

        GF_VALIDATE_OR_GOTO (this->name, conf, out);
        GF_VALIDATE_OR_GOTO (this->name, conf->defrag, out);

        defrag = conf->defrag;

        if ((defrag->cmd == GF_DEFRAG_CMD_START_TIER) ||
            (defrag->cmd == GF_DEFRAG_CMD_START_DETACH_TIER))
                ret = 1;
out:
        return ret;
}

int32_t
tier_migration_get_dst (xlator_t *this, dht_local_t *local)
{
        dht_conf_t              *conf   = NULL;
        int32_t                  ret = -1;
        gf_defrag_info_t        *defrag = NULL;

        GF_VALIDATE_OR_GOTO ("tier", this, out);
        GF_VALIDATE_OR_GOTO (this->name, this->private, out);

        conf = this->private;

        defrag = conf->defrag;

        if (defrag && defrag->cmd == GF_DEFRAG_CMD_START_DETACH_TIER) {
                local->rebalance.target_node = conf->subvolumes[0];

        } else if (conf->subvolumes[0] == local->cached_subvol)
                local->rebalance.target_node =
                        conf->subvolumes[1];
        else
                local->rebalance.target_node =
                        conf->subvolumes[0];

        if (local->rebalance.target_node)
                ret = 0;

out:
        return ret;
}

xlator_t *
tier_search (xlator_t *this, dht_layout_t *layout, const char *name)
{
        xlator_t                *subvol = NULL;
        dht_conf_t              *conf   = NULL;

        GF_VALIDATE_OR_GOTO ("tier", this, out);
        GF_VALIDATE_OR_GOTO (this->name, this->private, out);

        conf = this->private;

        subvol = TIER_HASHED_SUBVOL;

 out:
        return subvol;
}


static int
tier_load_externals (xlator_t *this)
{
        int               ret            = -1;
        char *libpathfull = (LIBDIR "/libgfdb.so.0");
        get_gfdb_methods_t get_gfdb_methods;

        GF_VALIDATE_OR_GOTO ("this", this, out);

        libhandle = dlopen (libpathfull, RTLD_NOW);
        if (!libhandle) {
                gf_msg(this->name, GF_LOG_ERROR, 0,
                       DHT_MSG_LOG_TIER_ERROR,
                       "Error loading libgfdb.so %s\n", dlerror());
                ret = -1;
                goto out;
        }

        get_gfdb_methods = dlsym (libhandle, "get_gfdb_methods");
        if (!get_gfdb_methods) {
                gf_msg(this->name, GF_LOG_ERROR, 0,
                       DHT_MSG_LOG_TIER_ERROR,
                       "Error loading get_gfdb_methods()");
                ret = -1;
                goto out;
        }

        get_gfdb_methods (&gfdb_methods);

        ret = 0;

out:
        if (ret && libhandle)
                dlclose (libhandle);

        return ret;
}

static tier_mode_t
tier_validate_mode (char *mode)
{
        int ret = -1;

        if (strcmp (mode, "test") == 0) {
                ret = TIER_MODE_TEST;
        } else {
                ret = TIER_MODE_WM;
        }

        return ret;
}

static gf_boolean_t
tier_validate_compact_mode (char *mode)
{
        gf_boolean_t ret = _gf_false;

        gf_msg ("tier", GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS,
                "tier_validate_compact_mode: mode = %s", mode);

        if (!strcmp (mode, "on")) {
                ret = _gf_true;
        } else {
                ret = _gf_false;
        }

        gf_msg ("tier", GF_LOG_ERROR, 0,
                DHT_MSG_LOG_TIER_STATUS,
                "tier_validate_compact_mode: ret = %i", ret);

        return ret;
}

int
tier_init_methods (xlator_t *this)
{
        int ret                  = -1;
        dht_conf_t      *conf    = NULL;
        dht_methods_t   *methods = NULL;

        GF_VALIDATE_OR_GOTO ("tier", this, err);

        conf = this->private;

        methods = &(conf->methods);

        methods->migration_get_dst_subvol = tier_migration_get_dst;
        methods->migration_other   = tier_start;
        methods->migration_needed  = tier_migration_needed;
        methods->layout_search     = tier_search;

        ret = 0;
err:
        return ret;
}


static void
tier_save_vol_name (xlator_t *this)
{
        dht_conf_t       *conf           = NULL;
        gf_defrag_info_t *defrag         = NULL;
        char             *suffix         = NULL;
        int               name_len       = 0;


        conf = this->private;
        defrag = conf->defrag;

        suffix = strstr (this->name, "-tier-dht");

        if (suffix)
                name_len = suffix - this->name;
        else
                name_len = strlen (this->name);

        if (name_len > GD_VOLUME_NAME_MAX)
                name_len = GD_VOLUME_NAME_MAX;

        strncpy (defrag->tier_conf.volname, this->name, name_len);
        defrag->tier_conf.volname[name_len] = 0;
}

int
tier_init (xlator_t *this)
{
        int               ret            = -1;
        int               freq           = 0;
        int               maxsize        = 0;
        dht_conf_t       *conf           = NULL;
        gf_defrag_info_t *defrag         = NULL;
        char             *voldir         = NULL;
        char             *mode           = NULL;
        char             *paused         = NULL;
        tier_mode_t	  tier_mode	 = DEFAULT_TIER_MODE;
        gf_boolean_t	  compact_mode	 = _gf_false;

        ret = dht_init (this);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "tier_init failed");
                goto out;
        }

        conf = this->private;

        ret = tier_init_methods (this);
        if (ret) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "tier_init_methods failed");
                goto out;
        }

        if (conf->subvolume_cnt != 2) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                        DHT_MSG_LOG_TIER_ERROR,
                        "Invalid number of subvolumes %d", conf->subvolume_cnt);
                goto out;
        }

        /* if instatiated from client side initialization is complete. */
        if (!conf->defrag) {
                ret = 0;
                goto out;
        }

        /* if instatiated from server side, load db libraries */
        ret = tier_load_externals (this);
        if (ret) {
                gf_msg(this->name, GF_LOG_ERROR, 0,
                       DHT_MSG_LOG_TIER_ERROR,
                       "Could not load externals. Aborting");
                goto out;
        }

        defrag = conf->defrag;

        defrag->tier_conf.last_demote_qfile_index = 0;
        defrag->tier_conf.last_promote_qfile_index = 0;

        defrag->tier_conf.is_tier = 1;
        defrag->this = this;

        ret = dict_get_int32 (this->options,
                              "tier-max-promote-file-size", &maxsize);
        if (ret) {
                maxsize = 0;
        }

        defrag->tier_conf.tier_max_promote_size = maxsize;

        ret = dict_get_int32 (this->options,
                              "tier-promote-frequency", &freq);
        if (ret) {
                freq = DEFAULT_PROMOTE_FREQ_SEC;
        }

        defrag->tier_conf.tier_promote_frequency = freq;

        ret = dict_get_int32 (this->options,
                              "tier-demote-frequency", &freq);
        if (ret) {
                freq = DEFAULT_DEMOTE_FREQ_SEC;
        }

        defrag->tier_conf.tier_demote_frequency = freq;

        ret = dict_get_int32 (this->options,
                              "tier-hot-compact-frequency", &freq);
        if (ret) {
                freq = DEFAULT_HOT_COMPACT_FREQ_SEC;
        }

        defrag->tier_conf.tier_compact_hot_frequency = freq;

        ret = dict_get_int32 (this->options,
                              "tier-cold-compact-frequency", &freq);
        if (ret) {
                freq = DEFAULT_COLD_COMPACT_FREQ_SEC;
        }

        defrag->tier_conf.tier_compact_cold_frequency = freq;

        ret = dict_get_int32 (this->options,
                              "watermark-hi", &freq);
        if (ret) {
                freq = DEFAULT_WM_HI;
        }

        defrag->tier_conf.watermark_hi = freq;

        ret = dict_get_int32 (this->options,
                              "watermark-low", &freq);
        if (ret) {
                freq = DEFAULT_WM_LOW;
        }

        defrag->tier_conf.watermark_low = freq;

        ret = dict_get_int32 (this->options,
                              "write-freq-threshold", &freq);
        if (ret) {
                freq = DEFAULT_WRITE_FREQ_SEC;
        }

        defrag->write_freq_threshold = freq;

        ret = dict_get_int32 (this->options,
                              "read-freq-threshold", &freq);
        if (ret) {
                freq = DEFAULT_READ_FREQ_SEC;
        }

        defrag->read_freq_threshold = freq;

        ret = dict_get_int32 (this->options,
                              "tier-max-mb", &freq);
        if (ret) {
                freq = DEFAULT_TIER_MAX_MIGRATE_MB;
        }

        defrag->tier_conf.max_migrate_bytes = (uint64_t) freq * 1024 * 1024;

        ret = dict_get_int32 (this->options,
                              "tier-max-files", &freq);
        if (ret) {
                freq = DEFAULT_TIER_MAX_MIGRATE_FILES;
        }

        defrag->tier_conf.max_migrate_files = freq;


        ret = dict_get_int32 (this->options,
                              "tier-query-limit",
                              &(defrag->tier_conf.query_limit));
        if (ret) {
                defrag->tier_conf.query_limit =
                        DEFAULT_TIER_QUERY_LIMIT;
        }

        ret = dict_get_str (this->options,
                            "tier-compact", &mode);

        if (ret) {
                defrag->tier_conf.compact_active = DEFAULT_COMP_MODE;
        } else {
                compact_mode = tier_validate_compact_mode (mode);
                /* If compaction is now active, we need to inform the bricks on
                   the hot and cold tier of this. See dht-common.h for more. */
                defrag->tier_conf.compact_active = compact_mode;
                if (compact_mode) {
                        defrag->tier_conf.compact_mode_switched_hot = _gf_true;
                        defrag->tier_conf.compact_mode_switched_cold = _gf_true;
                }
        }

        ret = dict_get_str (this->options,
                            "tier-mode", &mode);
        if (ret) {
                defrag->tier_conf.mode = DEFAULT_TIER_MODE;
        } else {
                tier_mode = tier_validate_mode (mode);
                defrag->tier_conf.mode = tier_mode;
        }

        pthread_mutex_init (&defrag->tier_conf.pause_mutex, 0);

        gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING);

        ret = dict_get_str (this->options,
                              "tier-pause", &paused);

        if (paused && strcmp (paused, "on") == 0)
                gf_defrag_set_pause_state (&defrag->tier_conf,
                                           TIER_REQUEST_PAUSE);

        ret = gf_asprintf(&voldir, "%s/%s",
                          DEFAULT_VAR_RUN_DIRECTORY,
                          this->name);
        if (ret < 0)
                goto out;

        ret = mkdir_p(voldir, 0777, _gf_true);
        if (ret == -1 && errno != EEXIST) {
                gf_msg (this->name, GF_LOG_ERROR, 0,
                       DHT_MSG_LOG_TIER_ERROR,
                       "tier_init failed");

                GF_FREE(voldir);
                goto out;
        }

        GF_FREE(voldir);

        ret = gf_asprintf (&promotion_qfile, "%s/%s/promote",
                           DEFAULT_VAR_RUN_DIRECTORY,
                           this->name);
        if (ret < 0)
                goto out;

        ret = gf_asprintf (&demotion_qfile, "%s/%s/demote",
                           DEFAULT_VAR_RUN_DIRECTORY,
                           this->name);
        if (ret < 0) {
                GF_FREE (promotion_qfile);
                goto out;
        }

        gf_msg (this->name, GF_LOG_INFO, 0,
                DHT_MSG_LOG_TIER_STATUS,
               "Promote/demote frequency %d/%d "
               "Write/Read freq thresholds %d/%d",
               defrag->tier_conf.tier_promote_frequency,
               defrag->tier_conf.tier_demote_frequency,
               defrag->write_freq_threshold,
               defrag->read_freq_threshold);

        tier_save_vol_name (this);

        ret = 0;

out:

        return ret;
}

int
tier_cli_pause_done (int op_ret, call_frame_t *sync_frame, void *data)
{
        gf_msg ("tier", GF_LOG_INFO, 0,
                DHT_MSG_TIER_PAUSED,
                "Migrate file paused with op_ret %d", op_ret);

        return op_ret;
}

int
tier_cli_pause (void *data)
{
        gf_defrag_info_t        *defrag         = NULL;
        xlator_t                *this           = NULL;
        dht_conf_t              *conf           = NULL;
        int                      ret            = -1;

        this = data;

        conf = this->private;
        GF_VALIDATE_OR_GOTO (this->name, conf, exit);

        defrag = conf->defrag;
        GF_VALIDATE_OR_GOTO (this->name, defrag, exit);

        gf_defrag_pause_tier (this, defrag);

        ret = 0;
exit:
        return ret;
}

int
tier_reconfigure (xlator_t *this, dict_t *options)
{
        dht_conf_t       *conf            = NULL;
        gf_defrag_info_t *defrag          = NULL;
        char             *mode            = NULL;
        int               migrate_mb      = 0;
        gf_boolean_t      req_pause       = _gf_false;
        int               ret             = 0;
        call_frame_t            *frame    = NULL;
        gf_boolean_t last_compact_setting = _gf_false;

        conf = this->private;

        if (conf->defrag) {
                defrag = conf->defrag;
                GF_OPTION_RECONF ("tier-max-promote-file-size",
                                  defrag->tier_conf.tier_max_promote_size,
                                  options, int32, out);

                GF_OPTION_RECONF ("tier-promote-frequency",
                                  defrag->tier_conf.tier_promote_frequency,
                                  options, int32, out);

                GF_OPTION_RECONF ("tier-demote-frequency",
                                  defrag->tier_conf.tier_demote_frequency,
                                  options, int32, out);

                GF_OPTION_RECONF ("write-freq-threshold",
                                  defrag->write_freq_threshold, options,
                                  int32, out);

                GF_OPTION_RECONF ("read-freq-threshold",
                                  defrag->read_freq_threshold, options,
                                  int32, out);

                GF_OPTION_RECONF ("watermark-hi",
                                  defrag->tier_conf.watermark_hi, options,
                                  int32, out);

                GF_OPTION_RECONF ("watermark-low",
                                  defrag->tier_conf.watermark_low, options,
                                  int32, out);

                last_compact_setting = defrag->tier_conf.compact_active;

                GF_OPTION_RECONF ("tier-compact",
                                  defrag->tier_conf.compact_active, options,
                                  bool, out);

                if (last_compact_setting != defrag->tier_conf.compact_active) {
                        defrag->tier_conf.compact_mode_switched_hot = _gf_true;
                        defrag->tier_conf.compact_mode_switched_cold = _gf_true;
                        gf_msg (this->name, GF_LOG_INFO, 0,
                                DHT_MSG_LOG_TIER_STATUS,
                                "compact mode switched");
                }

                GF_OPTION_RECONF ("tier-hot-compact-frequency",
                                  defrag->tier_conf.tier_compact_hot_frequency,
                                  options, int32, out);

                GF_OPTION_RECONF ("tier-cold-compact-frequency",
                                  defrag->tier_conf.tier_compact_cold_frequency,
                                  options, int32, out);

                GF_OPTION_RECONF ("tier-mode",
                                  mode, options,
                                  str, out);
                defrag->tier_conf.mode = tier_validate_mode (mode);

                GF_OPTION_RECONF ("tier-max-mb",
                                  migrate_mb, options,
                                  int32, out);
                defrag->tier_conf.max_migrate_bytes = (uint64_t) migrate_mb *
                                                        1024 * 1024;

                GF_OPTION_RECONF ("tier-max-files",
                                  defrag->tier_conf.max_migrate_files, options,
                                  int32, out);

                GF_OPTION_RECONF ("tier-query-limit",
                                  defrag->tier_conf.query_limit,
                                  options, int32, out);

                GF_OPTION_RECONF ("tier-pause",
                                  req_pause, options,
                                  bool, out);

                if (req_pause == _gf_true) {

                        frame = create_frame (this, this->ctx->pool);
                        if (!frame)
                                goto out;

                        frame->root->pid = GF_CLIENT_PID_DEFRAG;

                        ret = synctask_new (this->ctx->env, tier_cli_pause,
                                            tier_cli_pause_done, frame, this);

                        if (ret) {
                                gf_msg (this->name, GF_LOG_ERROR, 0,
                                        DHT_MSG_LOG_TIER_ERROR,
                                        "pause tier failed on reconfigure");
                        }
                } else {
                        ret = gf_defrag_resume_tier (this, defrag);
                        if (ret) {
                                gf_msg (this->name, GF_LOG_ERROR, 0,
                                        DHT_MSG_LOG_TIER_ERROR,
                                        "resume tier failed on reconfigure");
                        }
                }

        }

out:
        return dht_reconfigure (this, options);
}

void
tier_fini (xlator_t *this)
{
        if (libhandle)
                dlclose (libhandle);

        GF_FREE (demotion_qfile);
        GF_FREE (promotion_qfile);

        dht_fini(this);
}

class_methods_t class_methods = {
        .init           = tier_init,
        .fini           = tier_fini,
        .reconfigure    = tier_reconfigure,
        .notify         = dht_notify
};

struct xlator_fops fops = {

        .lookup      = dht_lookup,
        .create      = tier_create,
        .mknod       = dht_mknod,

        .open        = dht_open,
        .statfs      = tier_statfs,
        .opendir     = dht_opendir,
        .readdir     = tier_readdir,
        .readdirp    = tier_readdirp,
        .fsyncdir    = dht_fsyncdir,
        .symlink     = dht_symlink,
        .unlink      = tier_unlink,
        .link        = tier_link,
        .mkdir       = dht_mkdir,
        .rmdir       = dht_rmdir,
        .rename      = dht_rename,
        .entrylk     = dht_entrylk,
        .fentrylk    = dht_fentrylk,

        /* Inode read operations */
        .stat        = dht_stat,
        .fstat       = dht_fstat,
        .access      = dht_access,
        .readlink    = dht_readlink,
        .getxattr    = dht_getxattr,
        .fgetxattr    = dht_fgetxattr,
        .readv       = dht_readv,
        .flush       = dht_flush,
        .fsync       = dht_fsync,
        .inodelk     = dht_inodelk,
        .finodelk    = dht_finodelk,
        .lk          = dht_lk,

        /* Inode write operations */
        .fremovexattr = dht_fremovexattr,
        .removexattr = dht_removexattr,
        .setxattr    = dht_setxattr,
        .fsetxattr   = dht_fsetxattr,
        .truncate    = dht_truncate,
        .ftruncate   = dht_ftruncate,
        .writev      = dht_writev,
        .xattrop     = dht_xattrop,
        .fxattrop    = dht_fxattrop,
        .setattr     = dht_setattr,
        .fsetattr    = dht_fsetattr,
        .fallocate   = dht_fallocate,
        .discard     = dht_discard,
        .zerofill    = dht_zerofill,
};

struct xlator_cbks cbks = {
        .release    = dht_release,
        .forget     = dht_forget
};
